ATOM Documentation

← Back to App

# Historical Sync Architecture

Backfill integration data (emails, calendar events, messages, etc.) into dual memory for AI agent context: structured graph (Postgres GraphNode/GraphEdge) and semantic vector (LanceDB with hybrid keyword+embedding search). Supports 3+ months of historical data ingestion with automatic worker scaling.

## Architecture Overview

┌──────────────┐     Redis Queue      ┌──────────────┐
│  Web Machine  │ ── enqueue ──────→  │ Worker VM #1  │ ── fetch ──→ Microsoft Graph
│  (Next.js +   │                     │  (2GB, 1xCPU) │              Google APIs
│   FastAPI)     │ ←── poll results ── │               │              Slack, etc.
└──────────────┘                     └──────────────┘
       │                                    │
       │ Fly Machines API                   │ Fly Machines API
       │ (start/stop/create)                │ (self-report idle)
       ▼                                    ▼
┌──────────────────────────────────────────────────┐
│              Fly.io Infrastructure                │
│  auto_stop_machines: true                        │
│  auto_start_machines: true                       │
│  Worker VMs: 1–3, 2GB each                      │
└──────────────────────────────────────────────────┘

## Dual-Memory Architecture

Each ingested record produces two parallel memory stores:

| Memory Type | Storage | What | Used For |

|-------------|---------|------|----------|

| Structured | Postgres GraphNode/GraphEdge | Entities + relationships via LLM + rule-based extraction | Knowledge graph queries |

| Semantic | LanceDB | Raw text + vector embeddings | Hybrid keyword + semantic similarity search |

### Per-Chunk Ingestion Flow (100 records)

For each record in chunk:
    |
    +---> _extract_structured_entities()  --> Postgres DiscoveredEntity
    |       * Rule-based from metadata (from, to, subject)
    |       * Linked to EntityTypeDefinition during schema discovery
    |       * Promoted to GraphNode on entity type activation
    |
    +---> graphrag.ingest_document()      --> Postgres GraphNode/GraphEdge
    |       * LLM extraction via tenant BYOK key
    |       * Entities: people, orgs, topics
    |       * Idempotent (content hash dedup)
    |
    +---> Collect text in chunk_texts[]

After chunk (batched):
    |
    +---> asyncio.gather(                    --> LanceDB (semantic memory)
    |       lancedb.add_document() x 100)    * 100 concurrent embedding API calls
    |                                         * ~2s total vs 20s sequential
    |                                         * Raw text + vector embeddings
    |                                         * Direct store, no transfer pipeline

### Why Batched?

Per-record lancedb.add_document() calls the OpenAI embedding API synchronously.

100 records × 200ms = 20s of blocking calls per chunk, triggering the 30-minute reaper.

asyncio.gather() runs all 100 calls concurrently, completing in ~2s.

### Memory Lifecycle

1. **Semantic memory**: LanceDB — raw text + embeddings stored directly during ingestion. Hybrid keyword + vector search. No transfer pipeline.

2. **Structured memory**: Postgres — GraphNode/GraphEdge for knowledge graph queries, DiscoveredEntity for pre-promotion staging. Permanent.

## Components

### 1. Job Lifecycle (core/historical_sync_service.py)

**States:** pendingrunningcompleted / failed / paused / cancelled

- start_historical_sync() — validates plan tier, ACU quota, enqueues to Redis

- _process_sync_job() — background task: fetches records, extracts entities, persists to GraphRAG

- cancel_sync() — marks job cancelled; processing loop checks this on each chunk

- resume_sync() — re-enqueues to Redis worker queue from checkpoint

**Chunk processing (per chunk):**

1. Memory check (worker VMs only)

2. fetch_paginated_records() from integration

3. Entity/relationship extraction via _extract_structured_entities()

4. Persist to GraphRAG via ingest_structured_data()

5. Schema discovery for new entity types

6. Checkpoint save, heartbeat update, progress broadcast

### 2. Worker Queue (core/sync_job_queue.py)

Redis-backed priority queue with:

- **Sorted set** for priority + FIFO ordering

- **Job locking** (SETNX with 5-minute TTL) — prevents duplicate processing

- **Dead-letter queue** for failed jobs with retry support

- **Queue metrics** (depth, idle time) for autoscaling decisions

### 3. Worker Process (workers/sync_worker.py)

- Polls Redis every 1 second via dequeue()

- Acquires job lock before processing

- Creates fresh DB session per job (SessionLocal()) — prevents connection leaks

- **Never self-shuts down** — polls forever, Fly.io manages VM lifecycle

- SIGTERM/SIGINT handlers for graceful shutdown

### 4. Autoscaling (core/sync_job_queue.py + core/startup_tasks.py)

**Scale-up** (on start_historical_sync):

- ensure_worker_running() checks if workers exist, starts stopped ones, or creates new

- If queue_depth > 5 and < 3 workers running → scale_up() → Fly API creates new 2GB worker VM

**Scale-down** (every 5 minutes via maintenance loop):

- autoscale_workers() checks idle time

- If idle >= 5 minutes and > 1 workers → scale_down() → destroys newest worker

- **Always keeps at least 1 worker**

**Max capacity:** 3 concurrent workers, each processing 100 records/chunk

### 5. Reaper (core/startup_tasks.py)

Runs every 5 minutes on the web machine:

- Finds jobs with status = "running" and last_heartbeat older than 15 minutes

- Marks them cancelled with "Abandoned (server restart or timeout)"

- Jobs set last_heartbeat before entering the processing loop (initial) and after each chunk

### 6. Integration Fetch (integrations/outlook_service.py)

- **Token refresh** via Microsoft OAuth endpoint (30s timeout)

- **Page fetch** via Microsoft Graph API with:

- 30s aiohttp timeout

- Retry on 504 Gateway Timeout (1 retry, 2s backoff)

- Retry on asyncio.TimeoutError (1 retry, 2s backoff)

- **Compound page tokens** ("channel||nextLink") for email + calendar pagination

- **Lazy token refresh** — checks expiry before each page fetch

## Timeouts and Resilience

| Operation | Timeout | Retry | Fallback |

|-----------|---------|-------|----------|

| MS Graph API page fetch | 30s | 1 (504/timeout) | Returns error to job |

| MS OAuth token refresh | 30s | None | Returns None → auth failure |

| Fly Machines API calls | 10s | None | Falls back to in-process |

| Reaper heartbeat check | 15 min | N/A | Cancels abandoned jobs |

| DB session pool timeout | 30s | N/A | Raises TimeoutError |

## Key Configuration

# fly.toml
[[vm]]
  memory = "2gb"
  cpus = 1
  memory_mb = 2048
  processes = ["worker"]

[http_service]
  auto_stop_machines = true
  auto_start_machines = true
# sync_job_queue.py
SCALE_UP_QUEUE_DEPTH = 5       # Scale up if > 5 jobs waiting
SCALE_DOWN_IDLE_MINUTES = 5    # Scale down if idle 5+ min
LOCK_TIMEOUT = 300             # 5 min job lock TTL
# sync_worker.py
POLL_INTERVAL = 1              # Redis poll every 1 second

## Database Schema

historical_sync_jobs table:

- id (UUID) — primary key

- tenant_id (VARCHAR) — tenant isolation

- integration_id (VARCHAR) — e.g., "outlook", "slack"

- source_connection_id (VARCHAR) — UserConnection ID

- status — pending / running / completed / failed / paused / cancelled

- progress_percentage, records_processed

- entities_extracted, relationships_extracted

- last_heartbeat — updated before loop and after each chunk

- checkpoint_data (JSONB) — for resumability

- last_error, error_count, max_retries

## Common Failure Modes and Fixes

| Symptom | Likely Cause | Fix |

|---------|-------------|-----|

| Job stuck pending | Worker VM stopped, not restarted | ensure_worker_running() wakes it |

| Job running with 0/0 for minutes | Token refresh or Graph API hanging | 30s timeout + 504 retry |

| Job cancelled "Abandoned" | Reaper killed it — no heartbeat | Initial heartbeat before loop |

| Job paused | Memory threshold on web machine | Memory check only on worker VMs |

| Job failed with auth error | Expired refresh token | Terminal failure, UI shows Reconnect |

| Worker self-stops mid-job | Old idle-shutdown bug | Removed self-shutdown entirely |